﻿/**
* CRL
*/
using CRL.Core;
using CRL.Core.Extension;
using CRL.RedisProvider;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

namespace CRL.EventBus.Queue
{
    public class RedisConfig: ConfigBase
    {
        public bool UseList { get; set; }
    }
    /// <summary>
    /// 使用list作队列，不会丢失数据
    /// </summary>
    class Redis2 : AbsQueue
    {
        IRedisClient client;
        ConfigBase _queueConfig;
        public override MQType MQType => MQType.Redis;
        List<ThreadWork> threads = new List<ThreadWork>();
        public Redis2(ConfigBase queueConfig)
        {
            client = RedisClientFactory.GetCient();
            _queueConfig = queueConfig;
        }
        public override void Dispose()
        {

        }
        public override void PublishList(string routingKey, IEnumerable<object> msgs, Action<PublishOption> optFunc = null)
        {
            var opt = new PublishOption();
            optFunc?.Invoke(opt);
            var time = opt.DelaySecond > 0 ? DateTime.Now.AddSeconds(opt.DelaySecond) : DateTime.Now.AddMilliseconds(opt.DelayMs);
            var list = msgs.Select(b => new MsgPackage
            {
                _InnerData = b.ToJson(),
                Priority = opt.Priority,
                RoutingKey = routingKey,
                Time = time,
                DelayMs = opt.DelaySecond > 0 ? opt.DelaySecond * 1000 : opt.DelayMs,
                MsgKey = opt.MsgKey
            });
            var key = $"queue_{routingKey}";
            foreach (var msg in list)
            {
                client.ListRightPush(key, msg.ToJson());
            }
        }
        public override Task PublishListAsync(string routingKey, IEnumerable<object> msgs, Action<PublishOption> optFunc = null)
        {
            PublishList(routingKey, msgs, optFunc);
            return Task.CompletedTask;
        }
        public override void RePublish(IData data, object msg)
        {
            var data2 = new MsgPackage
            {
                _InnerData = msg.ToJson(),
                RoutingKey = data.RoutingKey,
                RetryTimes = data.RetryTimes + 1,
                Priority = data.Priority,
                MsgKey = data.MsgKey,
                Time = data.Time
            };
            var key = $"queue_{data.RoutingKey}";
            client.ListRightPush(key, data2.ToJson());
        }


        public override void Subscribe(EventDeclare eventDeclare)
        {
            var thread = new Core.ThreadWork() { Args = eventDeclare };
            //随机sleep
            System.Threading.Thread.Sleep(10);
            var second = eventDeclare.SubscribeAttribute.ThreadSleepSecond;
            thread.Start("eventBusRedisSub", SubscribeData, second, true);
            threads.Add(thread);
        }
        public override void SubscribeAsync(EventDeclare eventDeclare)
        {
            Subscribe(eventDeclare);
        }
        bool SubscribeData(object args)
        {
            SubscribeDataSync(args).Wait();
            return true;
        }
        async Task<bool> SubscribeDataSync(object args, string msgKey = "")
        {
            var ed = args as EventDeclare;
        label1:
            bool throwEx = false;
            var key = $"queue_{ed.Name}";
            var take = ed.SubscribeAttribute.BatchSize;
            var allData = client.ListRange(key, 0, take - 1);
            if (!allData.Any())
            {
                return true;
            }
            var msgs = allData.Select(b => b.ToObject<MsgPackage>());

            var attr = ed.SubscribeAttribute;
            var instance = ed.CreateServiceInstance();
            if (ed.IsArray)
            {
                var objInstance = DynamicMethodHelper.CreateCtorFuncFromCache(ed.EventDataType)();
                var innerType = ed.EventDataType.GenericTypeArguments[0];
                var method = ed.EventDataType.GetMethod("Add");
                foreach (var m in msgs)
                {
                    object item;
                    try
                    {
                        item = m._InnerData.ToObject(innerType);
                    }
                    catch (Exception ex)
                    {
                        throw new Exception($"发布和订阅类型冲突 {ex.Message} subName:{ed.Name} {innerType}=>{m._InnerData}");
                    }
                    method.Invoke(objInstance, new object[] { item });
                }
                object result = true;
                try
                {
                    result = await getInvokeResult(ed, instance, objInstance);
                }
                catch (Exception ero)
                {
                    if (throwEx)
                    {
                        throw ero;
                    }
                    result = false;
                    Log(ed, ero.ToString());
                }
                var invokeSuccess = checkInvokeSuccess(result);
                if (invokeSuccess.invokeSuccess)
                {
                    //成功则删除
                    client.ListTrim(key, allData.Count(), -1);
                }
                foreach (var m in msgs)
                {
                    var item = m._InnerData.ToObject(innerType);
                    CheckResult(invokeSuccess, m, ed, item);
                }
            }
            else
            {
                //throwEx = true;
                foreach (var m in msgs)
                {
                    object item;
                    try
                    {
                        item = m._InnerData.ToObject(ed.EventDataType);
                    }
                    catch (Exception ex)
                    {
                        throw new Exception($"发布和订阅类型冲突 {ex.Message} subName:{ed.Name} {ed.EventDataType}=>{m._InnerData}");
                    }
                    object result = true;
                    try
                    {
                        result = await getInvokeResult(ed, instance, item);
                    }
                    catch (Exception ero)
                    {
                        if (throwEx)
                        {
                            throw ero;
                        }
                        result = false;
                        Log(ed, ero.ToString());
                    }
                    var invokeSuccess = checkInvokeSuccess(result);
                    if (invokeSuccess.invokeSuccess)
                    {
                        //manage.Delete(b => b.Id == m.Id);
                    }
                    CheckResult(invokeSuccess, m, ed, item);
                }
                client.ListTrim(key, allData.Count(), -1);
            }
            //client.ListTrim(key, allData.Count(), -1);
            if (take == allData.Count())
            {
                goto label1;
            }
            return true;
        }
        public override long GetQueueLength(string routingKey)
        {
            var key = $"queue_{routingKey}";
            return client.ListLength(key);
        }
        public override long CleanQueue(string routingKey)
        {
            var key = $"queue_{routingKey}";
            client.ListTrim(key, 0, -1);
            return 0;
        }
    }
}
